Программирование сетевых приложений

Организация потоков, параллельной обработки, синхронизации и распределенной обработки синхронизуемых участков кода

Программирование сетевых приложений

Содержание лекции

  • Общее представление о потоках
  • Создание, остановка и соединение потоков
  • Планирование потоков и управление ими
  • Синхронизация потоков
  • Синхронизированные методы и блоки операторов
  • Тупики и методы их предотвращения
  • Коммуникация между потоками
  • Примеры на C++ и Qt
Организация потоков и синхронизация
Программирование сетевых приложений

Введение в многопоточность

Многопоточность - это механизм, позволяющий программе выполнять несколько операций одновременно. В современных приложениях потоки используются для повышения производительности, отзывчивости интерфейса и эффективного использования многоядерных процессоров.

Организация потоков и синхронизация
Программирование сетевых приложений

Основные понятия

  • Поток (thread) - наименьшая единица обработки, которую может запланировать операционная система
  • Процесс (process) - экземпляр выполняющейся программы с собственным адресным пространством
  • Параллелизм - одновременное выполнение нескольких вычислений
  • Синхронизация - координация выполнения потоков для предотвращения конфликтов
Организация потоков и синхронизация
Программирование сетевых приложений

Создание потоков в C++

#include <iostream>
#include <thread>
#include <chrono>

void workerFunction(int id) {
    std::cout << "Поток " << id << " начал работу" << std::endl;
    std::this_thread::sleep_for(std::chrono::seconds(1));
    std::cout << "Поток " << id << " завершил работу" << std::endl;
}

int main() {
    std::cout << "Главный поток: создание потоков" << std::endl;
    
    // Создание потоков
    std::thread thread1(workerFunction, 1);
    std::thread thread2(workerFunction, 2);
    
    // Ожидание завершения потоков
    thread1.join();
    thread2.join();
    
    std::cout << "Главный поток: все потоки завершены" << std::endl;
    
    return 0;
}
Организация потоков и синхронизация
Программирование сетевых приложений

Потоки в Qt

#include <QThread>
#include <QDebug>
#include <QCoreApplication>

class Worker : public QObject {
    Q_OBJECT
    
public slots:
    void doWork() {
        qDebug() << "Рабочий поток:" << QThread::currentThreadId();
        emit workFinished();
    }
    
signals:
    void workFinished();
};

class Controller : public QObject {
    Q_OBJECT
    
public:
    Controller() {
        Worker* worker = new Worker;
        QThread* workerThread = new QThread(this);
        
        // Перемещаем worker в новый поток
        worker->moveToThread(workerThread);
        
        // Связываем сигналы и слоты
        connect(workerThread, &QThread::started, worker, &Worker::doWork);
        connect(worker, &Worker::workFinished, workerThread, &QThread::quit);
        connect(worker, &Worker::workFinished, worker, &Worker::deleteLater);
        connect(workerThread, &QThread::finished, workerThread, &QThread::deleteLater);
        
        workerThread->start();
    }
};
Организация потоков и синхронизация
Программирование сетевых приложений

Наследование от QThread

#include <QThread>
#include <QDebug>

class CustomThread : public QThread {
    Q_OBJECT
    
protected:
    void run() override {
        qDebug() << "Поток начал выполнение:" << currentThreadId();
        
        for (int i = 0; i < 5; ++i) {
            qDebug() << "Итерация" << i << "в потоке" << currentThreadId();
            msleep(1000); // Пауза 1 секунда
        }
        
        qDebug() << "Поток завершил выполнение:" << currentThreadId();
    }
};

int main(int argc, char *argv[]) {
    QCoreApplication app(argc, argv);
    
    CustomThread thread;
    thread.start();
    thread.wait(); // Ожидание завершения потока
    
    return 0;
}
Организация потоков и синхронизация
Программирование сетевых приложений

Синхронизация потоков с использованием мьютексов

#include <iostream>
#include <thread>
#include <mutex>
#include <vector>

std::mutex mtx;
int sharedCounter = 0;

void incrementCounter(int iterations) {
    for (int i = 0; i < iterations; ++i) {
        mtx.lock();
        ++sharedCounter;
        mtx.unlock();
    }
}

int main() {
    const int numThreads = 4;
    const int iterationsPerThread = 100000;
    
    std::vector<std::thread> threads;
    
    for (int i = 0; i < numThreads; ++i) {
        threads.emplace_back(incrementCounter, iterationsPerThread);
    }
    
    for (auto& t : threads) {
        t.join();
    }
    
    std::cout << "Итоговое значение счетчика: " << sharedCounter << std::endl;
    std::cout << "Ожидаемое значение: " << numThreads * iterationsPerThread << std::endl;
    
    return 0;
}
Организация потоков и синхронизация
Программирование сетевых приложений

Блокировка с использованием std::lock_guard

#include <iostream>
#include <thread>
#include <mutex>
#include <vector>

std::mutex mtx;
int sharedCounter = 0;

void incrementCounter(int iterations) {
    for (int i = 0; i < iterations; ++i) {
        std::lock_guard<std::mutex> lock(mtx);
        ++sharedCounter;
        // Мьютекс автоматически освобождается при выходе из области видимости
    }
}

int main() {
    const int numThreads = 4;
    const int iterationsPerThread = 100000;
    
    std::vector<std::thread> threads;
    
    for (int i = 0; i < numThreads; ++i) {
        threads.emplace_back(incrementCounter, iterationsPerThread);
    }
    
    for (auto& t : threads) {
        t.join();
    }
    
    std::cout << "Итоговое значение счетчика: " << sharedCounter << std::endl;
    
    return 0;
}
Организация потоков и синхронизация
Программирование сетевых приложений

Синхронизация в Qt с использованием QMutex

#include <QCoreApplication>
#include <QThread>
#include <QMutex>
#include <QDebug>
#include <QVector>

class Counter : public QObject {
    Q_OBJECT
    
private:
    int value;
    QMutex mutex;
    
public:
    Counter() : value(0) {}
    
public slots:
    void increment() {
        mutex.lock();
        int temp = value;
        QThread::msleep(1); // Имитация работы
        value = temp + 1;
        mutex.unlock();
    }
    
    int getValue() const {
        mutex.lock();
        int result = value;
        mutex.unlock();
        return result;
    }
};

class Worker : public QObject {
    Q_OBJECT
    
private:
    Counter* counter;
    
public:
    Worker(Counter* c) : counter(c) {}
    
public slots:
    void doWork() {
        for (int i = 0; i < 1000; ++i) {
            counter->increment();
        }
        emit workFinished();
    }
    
signals:
    void workFinished();
};
Организация потоков и синхронизация
Программирование сетевых приложений

Атомарные операции

#include <iostream>
#include <thread>
#include <atomic>
#include <vector>

std::atomic<int> atomicCounter(0);

void incrementAtomic(int iterations) {
    for (int i = 0; i < iterations; ++i) {
        atomicCounter.fetch_add(1, std::memory_order_relaxed);
    }
}

int main() {
    const int numThreads = 4;
    const int iterationsPerThread = 100000;
    
    std::vector<std::thread> threads;
    
    for (int i = 0; i < numThreads; ++i) {
        threads.emplace_back(incrementAtomic, iterationsPerThread);
    }
    
    for (auto& t : threads) {
        t.join();
    }
    
    std::cout << "Атомарный счетчик: " << atomicCounter.load() << std::endl;
    
    return 0;
}
Организация потоков и синхронизация
Программирование сетевых приложений

Условные переменные

#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>

std::mutex mtx;
std::condition_variable cv;
std::queue<int> dataQueue;
bool finished = false;

void producer() {
    for (int i = 0; i < 10; ++i) {
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
        
        std::unique_lock<std::mutex> lock(mtx);
        dataQueue.push(i);
        std::cout << "Производитель: добавлен элемент " << i << std::endl;
        lock.unlock();
        
        cv.notify_one();
    }
    
    std::unique_lock<std::mutex> lock(mtx);
    finished = true;
    lock.unlock();
    cv.notify_all();
}

void consumer(int id) {
    while (true) {
        std::unique_lock<std::mutex> lock(mtx);
        
        cv.wait(lock, [] { return !dataQueue.empty() || finished; });
        
        if (!dataQueue.empty()) {
            int data = dataQueue.front();
            dataQueue.pop();
            lock.unlock();
            
            std::cout << "Потребитель " << id << ": получен элемент " << data << std::endl;
        } else if (finished) {
            break;
        }
    }
}
Организация потоков и синхронизация
Программирование сетевых приложений

Тупики (Deadlocks), несколько потоков ожидают друг друга:

#include <iostream>
#include <thread>
#include <mutex>

std::mutex mutex1;
std::mutex mutex2;

void threadFunction1() {
    std::lock_guard<std::mutex> lock1(mutex1);
    std::this_thread::sleep_for(std::chrono::milliseconds(100));
    std::lock_guard<std::mutex> lock2(mutex2);
    std::cout << "Поток 1: получены оба мьютекса" << std::endl;
}

void threadFunction2() {
    std::lock_guard<std::mutex> lock2(mutex2);
    std::this_thread::sleep_for(std::chrono::milliseconds(100));
    std::lock_guard<std::mutex> lock1(mutex1);
    std::cout << "Поток 2: получены оба мьютекса" << std::endl;
}

int main() {
    std::thread t1(threadFunction1);
    std::thread t2(threadFunction2);
    
    t1.join();
    t2.join();
    
    return 0;
}
Организация потоков и синхронизация
Программирование сетевых приложений

Предотвращение тупиков с std::lock

#include <iostream>
#include <thread>
#include <mutex>

std::mutex mutex1;
std::mutex mutex2;

void threadFunction1() {
    std::unique_lock<std::mutex> lock1(mutex1, std::defer_lock);
    std::unique_lock<std::mutex> lock2(mutex2, std::defer_lock);
    
    std::lock(lock1, lock2); // Безопасная блокировка
    
    std::cout << "Поток 1: получены оба мьютекса" << std::endl;
}

void threadFunction2() {
    std::unique_lock<std::mutex> lock1(mutex1, std::defer_lock);
    std::unique_lock<std::mutex> lock2(mutex2, std::defer_lock);
    
    std::lock(lock1, lock2); // Безопасная блокировка
    
    std::cout << "Поток 2: получены оба мьютекса" << std::endl;
}

int main() {
    std::thread t1(threadFunction1);
    std::thread t2(threadFunction2);
    
    t1.join();
    t2.join();
    
    return 0;
}
Организация потоков и синхронизация
Программирование сетевых приложений

Коммуникация между потоками в Qt (сигналы и слоты)

#include <QCoreApplication>
#include <QThread>
#include <QDebug>
#include <QTimer>

class Worker : public QObject {
    Q_OBJECT
    
private:
    int counter;
    
public:
    Worker() : counter(0) {}
    
public slots:
    void process() {
        ++counter;
        emit progress(counter);
        
        if (counter >= 10) {
            emit finished();
        } else {
            QTimer::singleShot(1000, this, &Worker::process);
        }
    }
    
signals:
    void progress(int value);
    void finished();
};

class Controller : public QObject {
    Q_OBJECT
    
private:
    Worker* worker;
    QThread* workerThread;
    
public:
    Controller() {
        worker = new Worker;
        workerThread = new QThread(this);
        
        worker->moveToThread(workerThread);
        
        connect(workerThread, &QThread::started, worker, &Worker::process);
        connect(worker, &Worker::progress, this, &Controller::onProgress);
        connect(worker, &Worker::finished, this, &Controller::onFinished);
        connect(worker, &Worker::finished, workerThread, &QThread::quit);
        connect(workerThread, &QThread::finished, worker, &Worker::deleteLater);
        connect(workerThread, &QThread::finished, workerThread, &QThread::deleteLater);
        
        workerThread->start();
    }
    
public slots:
    void onProgress(int value) {
        qDebug() << "Прогресс:" << value;
    }
    
    void onFinished() {
        qDebug() << "Работа завершена";
        QCoreApplication::quit();
    }
};
Организация потоков и синхронизация
Программирование сетевых приложений

Потокобезопасные очереди

#include <queue>
#include <mutex>
#include <condition_variable>
#include <thread>
#include <iostream>

template<typename T>
class ThreadSafeQueue {
private:
    std::queue<T> queue;
    mutable std::mutex mtx;
    std::condition_variable cv;
    
public:
    void push(const T& item) {
        {
            std::unique_lock<std::mutex> lock(mtx);
            queue.push(item);
        }
        cv.notify_one();
    }
    
    bool pop(T& item) {
        std::unique_lock<std::mutex> lock(mtx);
        cv.wait(lock, [this] { return !queue.empty(); });
        
        if (queue.empty()) {
            return false;
        }
        
        item = queue.front();
        queue.pop();
        return true;
    }
    
    bool try_pop(T& item) {
        std::unique_lock<std::mutex> lock(mtx);
        
        if (queue.empty()) {
            return false;
        }
        
        item = queue.front();
        queue.pop();
        return true;
    }
    
    size_t size() const {
        std::unique_lock<std::mutex> lock(mtx);
        return queue.size();
    }
};
Организация потоков и синхронизация
Программирование сетевых приложений

Планирование потоков

Планировщик ОС решает, какой поток выполняется на каком ядре CPU.

Два основных подхода:

  • Вытесняющее (preemptive) — ОС принудительно переключает потоки (Windows, Linux). Каждый поток получает квант времени (time slice).
  • Кооперативное (cooperative) — потоки отдают управление добровольно (ранние системы, некоторые встраиваемые).

Алгоритмы планирования:

  • Round-Robin (циклический) — потоки по очереди, фиксированный квант
  • Priority-based (по приоритетам) — поток с наивысшим приоритетом выполняется первым
  • CFS (Completely Fair Scheduler, Linux) — красно-чёрное дерево, справедливое распределение CPU

Факторы, влияющие на планирование: приоритет, CPU affinity, I/O-ожидание, nice value

Организация потоков и синхронизация
Программирование сетевых приложений

Приоритеты потоков в Qt

QThread::Priority: IdlePriority, LowestPriority, LowPriority, NormalPriority, HighPriority, HighestPriority, TimeCriticalPriority, InheritPriority

Приоритет потока — это рекомендация для ОС, а не гарантия.

Некорректное использование приоритетов может привести к starvation (голоданию) низкоприоритетных потоков.

#include <QThread>

class NetworkWorker : public QThread {
protected:
    void run() override {
        // Установка приоритета
        setPriority(QThread::HighPriority);
        
        // Привязка к конкретному ядру CPU (платформо-зависимо)
#ifdef Q_OS_LINUX
        cpu_set_t cpuset;
        CPU_ZERO(&cpuset);
        CPU_SET(0, &cpuset);
        pthread_setaffinity_np((pthread_t)handle(), sizeof(cpu_set_t), &cpuset);
#endif
        // ... работа ...
    }
};
Организация потоков и синхронизация
Программирование сетевых приложений

Заключение

Многопоточность - это мощный инструмент, который требует тщательного проектирования и понимания. Ключевые принципы:

  1. Минимизация общих ресурсов - чем меньше данных разделяется между потоками, тем проще обеспечить синхронизацию
  2. Использование высокоуровневых абстракций - Qt предоставляет мощные механизмы для работы с потоками
  3. Предотвращение тупиков - всегда блокировать мьютексы в одинаковом порядке
  4. Правильное завершение потоков - использовать механизмы сигналов/слотов для безопасного завершения
  5. Тестирование - многопоточные приложения требуют тщательного тестирования на наличие гонок и тупиков

Понимание этих концепций позволяет создавать эффективные и надежные многопоточные приложения на C++ с использованием Qt.

Организация потоков и синхронизация
Программирование сетевых приложений

Вопросы для самопроверки

  1. Какие существуют способы создания потоков в C++?
  2. Чем отличается std::thread от QThread?
  3. Какие механизмы синхронизации существуют в C++?
  4. Что такое тупик и как его предотвратить?
  5. Как организовать коммуникацию между потоками в Qt?
  6. Когда следует использовать атомарные операции вместо мьютексов?
  7. Какие преимущества дает использование потокобезопасных контейнеров?
Организация потоков и синхронизация

Заметки докладчика: - Многопоточность критически важна для сетевых серверов — без неё невозможно обслуживать нескольких клиентов одновременно. - Эта лекция короче других — используйте оставшееся время для более глубокого обсуждения тем и живых демонстраций.

Заметки докладчика: - std::thread запускается немедленно при создании объекта. - Всегда вызывайте join() или detach() — иначе при уничтожении объекта thread будет вызван std::terminate и программа аварийно завершится. - Используйте RAII-обёртку или std::jthread (C++20), который автоматически вызывает join() в деструкторе.

Заметки докладчика: - КРИТИЧЕСКОЕ ПРАВИЛО Qt: QObject существует только в одном потоке. QTcpSocket нельзя использовать из потока, отличного от потока его родителя. - Используйте moveToThread() для перемещения объектов между потоками. - Сигналы/слоты между разными потоками автоматически используют QueuedConnection — это безопасно.

Заметки докладчика: - Всегда держите мьютекс заблокированным минимальное время. - НИКОГДА не выполняйте ввод-вывод под блокировкой — это убивает производительность сетевого сервера. - Используйте std::lock_guard / std::unique_lock для исключительной безопасности (RAII). - В Qt: QMutex совместно с QMutexLocker.

Заметки докладчика: - condition_variable применяется для шаблона «производитель-потребитель» — особенно актуально при обработке сетевых пакетов. - Всегда используйте с unique_lock, а не с lock_guard (lock_guard не поддерживает разблокировку). - Возможны ложные пробуждения (spurious wakeups) — ВСЕГДА проверяйте условие в цикле, а не в if.

Заметки докладчика: - Классическая задача «обедающих философов» — основной пример взаимной блокировки. - Стратегии предотвращения: всегда блокировать мьютексы в одинаковом порядке, использовать std::lock (избегает дедлоков), применять std::scoped_lock (C++17). - В сетевом программировании дедлок = сервер перестаёт отвечать на запросы клиентов.

Заметки докладчика: - Планирование потоков — обязательный пункт учебной программы (тема 7). - Для сетевых серверов: I/O-bound потоки (ожидание сети) должны иметь нормальный приоритет, CPU-bound (обработка) — ниже, чтобы не блокировать сеть. - CFS (Completely Fair Scheduler) — алгоритм Linux, использует красно-чёрное дерево для учёта времени каждого потока. - Приоритеты Qt — это обёртка над приоритетами ОС. QThread::InheritPriority означает наследование от создающего потока. - На практике: не стоит злоупотреблять приоритетами. Лучше правильно проектировать архитектуру (thread pool, async I/O).

Заметки докладчика: Ожидаемые ответы: 1. std::thread, QThread (наследование), QObject + moveToThread. 2. std::thread — низкоуровневый API стандартной библиотеки; QThread — интеграция с циклом событий Qt, сигналы/слоты. 3. Мьютексы (std::mutex, QMutex), атомарные операции (std::atomic), условные переменные (condition_variable), семафоры. 4. Тупик — ситуация, когда потоки бесконечно ожидают ресурсы друг друга. Предотвращение: единый порядок блокировки, std::lock, std::scoped_lock. 5. Сигналы и слоты (QueuedConnection между потоками), общие данные с мьютексами, QMetaObject::invokeMethod. 6. Когда операция простая и не требует защиты нескольких связанных переменных (например, счётчик). Атомарные операции быстрее мьютексов. 7. Избавляют от необходимости вручную синхронизировать доступ; снижают вероятность ошибок гонки данных.